{
 "cells": [
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "**_pySpark Basics: Subsetting Data_**"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "_by Jeff Levy (jlevy@urban.org)_\n",
    "\n",
    "_Last Updated: 31 Jul 2017, Spark v2.1_"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "_Abstract: This guide will go over filtering your data based on a specified criteria in order to get a subset_\n",
    "\n",
    "_Main operations used: `dtypes`, `take`, `show`, `select`, `drop`, `filter/where`, `sample`_"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "***"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "We begin by loading some real data from an S3 bucket (the same data used in the *basics* tutorial), allowing pySpark to auto determine the schema, then take a look at its contents:"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 1,
   "metadata": {},
   "outputs": [],
   "source": [
    "df = spark.read.csv('s3://ui-spark-social-science-public/data/Performance_2015Q1.txt', header=False, inferSchema=True, sep='|')"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 2,
   "metadata": {},
   "outputs": [
    {
     "data": {
      "text/plain": [
       "[('_c0', 'bigint'),\n",
       " ('_c1', 'string'),\n",
       " ('_c2', 'string'),\n",
       " ('_c3', 'double'),\n",
       " ('_c4', 'double'),\n",
       " ('_c5', 'int'),\n",
       " ('_c6', 'int'),\n",
       " ('_c7', 'int'),\n",
       " ('_c8', 'string'),\n",
       " ('_c9', 'int'),\n",
       " ('_c10', 'string'),\n",
       " ('_c11', 'string'),\n",
       " ('_c12', 'int'),\n",
       " ('_c13', 'string'),\n",
       " ('_c14', 'string'),\n",
       " ('_c15', 'string'),\n",
       " ('_c16', 'string'),\n",
       " ('_c17', 'string'),\n",
       " ('_c18', 'string'),\n",
       " ('_c19', 'string'),\n",
       " ('_c20', 'string'),\n",
       " ('_c21', 'string'),\n",
       " ('_c22', 'string'),\n",
       " ('_c23', 'string'),\n",
       " ('_c24', 'string'),\n",
       " ('_c25', 'string'),\n",
       " ('_c26', 'int'),\n",
       " ('_c27', 'string')]"
      ]
     },
     "execution_count": 2,
     "metadata": {},
     "output_type": "execute_result"
    }
   ],
   "source": [
    "df.dtypes"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 3,
   "metadata": {},
   "outputs": [
    {
     "data": {
      "text/plain": [
       "[Row(_c0=100002091588, _c1=u'01/01/2015', _c2=u'OTHER', _c3=4.125, _c4=None, _c5=0, _c6=360, _c7=360, _c8=u'01/2045', _c9=16740, _c10=u'0', _c11=u'N', _c12=None, _c13=None, _c14=None, _c15=None, _c16=None, _c17=None, _c18=None, _c19=None, _c20=None, _c21=None, _c22=None, _c23=None, _c24=None, _c25=None, _c26=None, _c27=None),\n",
       " Row(_c0=100002091588, _c1=u'02/01/2015', _c2=None, _c3=4.125, _c4=None, _c5=1, _c6=359, _c7=359, _c8=u'01/2045', _c9=16740, _c10=u'0', _c11=u'N', _c12=None, _c13=None, _c14=None, _c15=None, _c16=None, _c17=None, _c18=None, _c19=None, _c20=None, _c21=None, _c22=None, _c23=None, _c24=None, _c25=None, _c26=None, _c27=None),\n",
       " Row(_c0=100002091588, _c1=u'03/01/2015', _c2=None, _c3=4.125, _c4=None, _c5=2, _c6=358, _c7=358, _c8=u'01/2045', _c9=16740, _c10=u'0', _c11=u'N', _c12=None, _c13=None, _c14=None, _c15=None, _c16=None, _c17=None, _c18=None, _c19=None, _c20=None, _c21=None, _c22=None, _c23=None, _c24=None, _c25=None, _c26=None, _c27=None),\n",
       " Row(_c0=100002091588, _c1=u'04/01/2015', _c2=None, _c3=4.125, _c4=None, _c5=3, _c6=357, _c7=357, _c8=u'01/2045', _c9=16740, _c10=u'0', _c11=u'N', _c12=None, _c13=None, _c14=None, _c15=None, _c16=None, _c17=None, _c18=None, _c19=None, _c20=None, _c21=None, _c22=None, _c23=None, _c24=None, _c25=None, _c26=None, _c27=None),\n",
       " Row(_c0=100002091588, _c1=u'05/01/2015', _c2=None, _c3=4.125, _c4=None, _c5=4, _c6=356, _c7=356, _c8=u'01/2045', _c9=16740, _c10=u'0', _c11=u'N', _c12=None, _c13=None, _c14=None, _c15=None, _c16=None, _c17=None, _c18=None, _c19=None, _c20=None, _c21=None, _c22=None, _c23=None, _c24=None, _c25=None, _c26=None, _c27=None)]"
      ]
     },
     "execution_count": 3,
     "metadata": {},
     "output_type": "execute_result"
    }
   ],
   "source": [
    "df.take(5)"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "Note that this output looks messy because `take` doesn't format the results, it just shows you the row object with the format *column=value* for each column across a row.  It can be formatted nicely with `show()`, but due to the width of this data it will still look messy.  See below for `show()` in action.\n",
    "\n",
    "# Subsetting by Columns\n",
    "\n",
    "One of the simplest subsettings is done by selecting just a few of the columns:"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 4,
   "metadata": {},
   "outputs": [
    {
     "name": "stdout",
     "output_type": "stream",
     "text": [
      "+------------+----------+-----+-----+\n",
      "|         _c0|       _c1|  _c3|  _c9|\n",
      "+------------+----------+-----+-----+\n",
      "|100002091588|01/01/2015|4.125|16740|\n",
      "|100002091588|02/01/2015|4.125|16740|\n",
      "|100002091588|03/01/2015|4.125|16740|\n",
      "|100002091588|04/01/2015|4.125|16740|\n",
      "|100002091588|05/01/2015|4.125|16740|\n",
      "+------------+----------+-----+-----+\n",
      "only showing top 5 rows\n",
      "\n"
     ]
    }
   ],
   "source": [
    "from pyspark.sql.functions import col\n",
    "\n",
    "df_select = df.select(col('_c0'), col('_c1'), col('_c3'), col('_c9'))\n",
    "df_select.show(5)"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "Note that `show` defaults to showing the first 20 rows, but here we've specified only 5.  There is also a shortcut for this notation that does the same thing but is a little easier to read.  We show both because they both show up frequently in Spark resources:"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 5,
   "metadata": {},
   "outputs": [
    {
     "name": "stdout",
     "output_type": "stream",
     "text": [
      "+------------+----------+-----+-----+\n",
      "|         _c0|       _c1|  _c3|  _c9|\n",
      "+------------+----------+-----+-----+\n",
      "|100002091588|01/01/2015|4.125|16740|\n",
      "|100002091588|02/01/2015|4.125|16740|\n",
      "|100002091588|03/01/2015|4.125|16740|\n",
      "|100002091588|04/01/2015|4.125|16740|\n",
      "|100002091588|05/01/2015|4.125|16740|\n",
      "+------------+----------+-----+-----+\n",
      "only showing top 5 rows\n",
      "\n"
     ]
    }
   ],
   "source": [
    "df_select = df[['_c0', '_c1', '_c3', '_c9']]\n",
    "df_select.show(5)"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "Or we can do the same thing by dropping, which is convenient if we want to keep more columns than we want to drop:"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 6,
   "metadata": {
    "collapsed": true
   },
   "outputs": [],
   "source": [
    "df_drop = df_select.drop(col('_c3'))"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 7,
   "metadata": {},
   "outputs": [
    {
     "name": "stdout",
     "output_type": "stream",
     "text": [
      "+------------+----------+-----+\n",
      "|         _c0|       _c1|  _c9|\n",
      "+------------+----------+-----+\n",
      "|100002091588|01/01/2015|16740|\n",
      "|100002091588|02/01/2015|16740|\n",
      "|100002091588|03/01/2015|16740|\n",
      "|100002091588|04/01/2015|16740|\n",
      "|100002091588|05/01/2015|16740|\n",
      "+------------+----------+-----+\n",
      "only showing top 5 rows\n",
      "\n"
     ]
    }
   ],
   "source": [
    "df_drop.show(5)"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "# Subsetting by Rows\n",
    "\n",
    "We often want to subset by rows also, for example by specifying a conditional.  Note that we have to use `.show()` at the end of `.describe()`, because **.describe() returns a new dataframe** with the information.  In many other programs, such as Stata, `describe` returns a formatted table; here, both `summary` and `C6` are actually column names."
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 8,
   "metadata": {},
   "outputs": [
    {
     "name": "stdout",
     "output_type": "stream",
     "text": [
      "+-------+-----------------+\n",
      "|summary|              _c6|\n",
      "+-------+-----------------+\n",
      "|  count|          3526154|\n",
      "|   mean|354.7084951479714|\n",
      "| stddev| 4.01181251079202|\n",
      "|    min|              292|\n",
      "|    max|              480|\n",
      "+-------+-----------------+\n",
      "\n"
     ]
    }
   ],
   "source": [
    "df.describe('_c6').show()"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 9,
   "metadata": {
    "collapsed": true
   },
   "outputs": [],
   "source": [
    "df_sub = df.where(df['_c6'] < 358)"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 10,
   "metadata": {},
   "outputs": [
    {
     "name": "stdout",
     "output_type": "stream",
     "text": [
      "+-------+------------------+\n",
      "|summary|               _c6|\n",
      "+-------+------------------+\n",
      "|  count|           2598037|\n",
      "|   mean|353.15604897081914|\n",
      "| stddev|3.5170213056883983|\n",
      "|    min|               292|\n",
      "|    max|               357|\n",
      "+-------+------------------+\n",
      "\n"
     ]
    }
   ],
   "source": [
    "df_sub.describe('_c6').show()"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "You can see from the `max` entry for `_c6` that we've cut it off at below 358 now.  Also note that **`where` is an alias for `filter`**; you can use them interchangeably in pySpark.\n",
    "\n",
    "We can repeat the same proceedure for multiple conditions and columns using standard logical operators:"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 11,
   "metadata": {
    "collapsed": true
   },
   "outputs": [],
   "source": [
    "df_filter = df.where((df['_c6'] > 340) & (df['_c5'] < 4))"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 12,
   "metadata": {},
   "outputs": [
    {
     "name": "stdout",
     "output_type": "stream",
     "text": [
      "+-------+------------------+------------------+\n",
      "|summary|               _c6|               _c5|\n",
      "+-------+------------------+------------------+\n",
      "|  count|           1254131|           1254131|\n",
      "|   mean|358.48713810598736| 1.474693632483369|\n",
      "| stddev| 1.378961910349754|1.2067831502138422|\n",
      "|    min|               341|                -1|\n",
      "|    max|               361|                 3|\n",
      "+-------+------------------+------------------+\n",
      "\n"
     ]
    }
   ],
   "source": [
    "df_filter.describe('_c6', '_c5').show()"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "# Random Sampling\n",
    "\n",
    "And finally, you might want to take a random sample of rows.  This can be particularlly useful, for example, if your data is large enough to require more expensive clusters to be spun up to work with it all, and you want to use a smaller, less expensive cluster to work on a sample.  Once your code is completed, you can then spin up the more expensive cluster and simply apply your code to the full sample\n",
    "\n",
    "You can pass three arguments into sample: **the first is a boolean, which is True to sample with replacement, False without**.\n",
    "The second is the **fraction of the dataset to take**, in this case 5%, and the third is an **optional random seed**.  If\n",
    "you specify any integer here then someone else performing the same random operation that specifies the same seed\n",
    "will get the same result.  If no seed is passed then the exact random sampling can't be duplicated."
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 13,
   "metadata": {
    "collapsed": true
   },
   "outputs": [],
   "source": [
    "df_sample = df.sample(False, 0.05, 99)"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 14,
   "metadata": {},
   "outputs": [
    {
     "name": "stdout",
     "output_type": "stream",
     "text": [
      "+-------+------------------+\n",
      "|summary|               _c6|\n",
      "+-------+------------------+\n",
      "|  count|            176015|\n",
      "|   mean|354.69058318893275|\n",
      "| stddev| 4.028614501676224|\n",
      "|    min|               293|\n",
      "|    max|               361|\n",
      "+-------+------------------+\n",
      "\n"
     ]
    }
   ],
   "source": [
    "df_sample.describe('_c6').show()"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "If you compare this to our original summary stats on unfiltered column C6 from above, you'll see it does a pretty good job maintaining the mean and stddev in a sample of only 5% of the data.  You can then write this to a new file in an S3 bucket and work with it instead of the whole data."
   ]
  }
 ],
 "metadata": {
  "kernelspec": {
   "display_name": "Python 2",
   "language": "python",
   "name": "python2"
  },
  "language_info": {
   "codemirror_mode": {
    "name": "ipython",
    "version": 2
   },
   "file_extension": ".py",
   "mimetype": "text/x-python",
   "name": "python",
   "nbconvert_exporter": "python",
   "pygments_lexer": "ipython2",
   "version": "2.7.12"
  }
 },
 "nbformat": 4,
 "nbformat_minor": 1
}
